%%
%% %CopyrightBegin%
%%
%% SPDX-License-Identifier: Apache-2.0
%%
%% Copyright Ericsson AB 2010-2025. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%

-module(diameter_tcp).
-moduledoc """
Diameter transport over TCP.

This module implements diameter transport over TCP using `m:gen_tcp`. It can be
specified as the value of a `transport_module` option to
`diameter:add_transport/2` and implements the behaviour documented in
`m:diameter_transport`. TLS security is supported, either as an upgrade
following capabilities exchange or at connection establishment.

Note that the ssl application is required for TLS and must be started before
configuring TLS capability on diameter transports.

[](){: #start }

## SEE ALSO

`m:diameter`, `m:diameter_transport`, `m:gen_tcp`, `m:inet`, `m:ssl`
""".
-moduledoc(#{since => "OTP R14B03"}).

-dialyzer(no_improper_lists).

-behaviour(gen_server).

%% interface
-export([start/3]).

%% child start from supervisor
-export([start_link/1]).

%% child start from here
-export([init/1]).

%% gen_server callbacks
-export([handle_call/3,
         handle_cast/2,
         handle_info/2,
         code_change/3,
         terminate/2]).

-export([listener/1,%% diameter_sync callback
         info/1]).  %% service_info callback

%% test
-export([ports/0,
         ports/1]).

-export_type([connect_option/0,
              listen_option/0]).

-include_lib("diameter/include/diameter.hrl").

%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY,  ref).
-define(TRANSPORT_KEY, transport).

-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).

-define(DEFAULT_PORT, 3868).  %% RFC 3588, ch 2.1
-define(DEFAULT_FRAGMENT_TIMEOUT, 1000).

-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)).
-define(IS_TIMEOUT(N), (infinity == N orelse ?IS_UINT32(N))).

%% cb_info passed to ssl.
-define(TCP_CB(Mod), {Mod, tcp, tcp_closed, tcp_error}).

%% The same gen_server implementation supports three different kinds
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
%% a process owning the listening port. The monitor process
%% historically died after connection establishment, but can now live
%% on as the sender of outgoing messages, so that a blocking send
%% doesn't prevent messages from being received.

%% Listener process state.
-record(listener, {socket :: inet:socket(),
                   module :: module(),
                   service = false :: false | pid()}). %% service process

%% Monitor process state.
-record(monitor,
        {parent :: reference() | false | pid(),
         transport = self() :: pid(),
         ack = false :: boolean(),
         socket :: inet:socket() | ssl:sslsocket() | undefined,
         module :: module() | undefined}).

-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type frag()   :: maybe_improper_list(length(), binary())
                | binary().

-type connect_option() :: {raddr, inet:ip_address()}
                        | {rport, pos_integer()}
                        | {ssl_options, true | [ssl:tls_client_option()]}
                        | option()
                        | ssl:tls_client_option()
                        | gen_tcp:connect_option().

-type match() :: inet:ip_address()
               | string()
               | [match()].

-type listen_option() :: {accept, match()}
                       | {ssl_options, true | [ssl:tls_server_option()]}
                       | option()
                       | ssl:tls_server_option()
                       | gen_tcp:listen_option().

-type option() :: {port, non_neg_integer()}
                | {sender, boolean()}
                | sender
                | {message_cb, false | diameter:eval()}
                | {fragment_timer, 0..16#FFFFFFFF}.

%% Accepting/connecting transport process state.
-record(transport,
        {socket  :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
         active = false :: boolean(),           %% is socket active?
         recv   = true  :: boolean(),           %% should it be active?
         parent  :: pid(),          %% of process that started us
         module  :: module(),       %% gen_tcp-like module
         ssl     :: [term()] | boolean(),       %% ssl options, ssl or not
         frag = <<>> :: frag(),                 %% message fragment
         timeout :: infinity | 0..16#FFFFFFFF,  %% fragment timeout
         tref = false  :: false | reference(),  %% fragment timer reference
         flush = false :: boolean(),            %% flush fragment at timeout?
         message_cb  :: false | diameter:eval(),
         send        :: pid() | false}).         %% sending process

%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
%% uses this to set itself as the module to call, its start/3 just
%% calling start/3 here with the option set.

%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------

-doc """
start({Type, Ref}, Svc, [Opt]) -> {ok, Pid} | {ok, Pid, [LAddr]} | {error,
Reason}

The start function required by `m:diameter_transport`.

Options `raddr` and `rport` specify the remote address and port for a connecting
transport and are not valid for a listening transport.

Option `accept` specifies remote addresses for a listening transport and is not
valid for a connecting transport. If specified, a remote address that does not
match one of the specified addresses causes the connection to be aborted.
Multiple `accept` options can be specified. A string-valued `Match` that does
not parse as an address is interpreted as a regular expression.

Option `ssl_options` must be specified for a transport that should support TLS:
a value of `true` results in a TLS handshake immediately upon connection
establishment while `t:list/0` specifies options to be passed to `ssl:connect/2`
or `ssl:handshake/2` after capabilities exchange if TLS is negotiated.

Option `fragment_timer` specifies the timeout, in milliseconds, of a timer used
to flush messages from the incoming byte stream even if the number of bytes
indicated in the Message Length field of its Diameter Header have not yet been
accumulated: such a message is received over the transport interface after two
successive timeouts without the reception of additional bytes. Defaults to 1000.

[](){: #sender }

Option `sender` specifies whether or not to use a dedicated process for sending
outgoing messages, which avoids the possibility of send blocking reception.
Defaults to `false`. If set to `true` then a `message_cb` that avoids the
possibility of messages being queued in the sender process without bound should
be configured.

Option `message_cb` specifies a callback that is invoked on incoming and
outgoing messages, that can be used to implement flow control. It is applied to
two arguments: an atom indicating the reason for the callback (`send`, `recv`,
or `ack` after a completed send), and the message in question (binary() on
`recv`, binary() or diameter*packet record on `send` or `ack`, or `false` on
`ack` when an incoming request has been discarded). It should return a list of
actions and a new callback as tail; eg. `[fun cb/3, State]`. Valid actions are
the atoms `send` or `recv`, to cause a following message-valued action to be
sent/received, a message to send/receive (binary() or diameter_packet record),
or a boolean() to enable/disable reading on the socket. More than one
`send`/`recv`/message sequence can be returned from the same callback, and an
initial `send`/`recv` can be omitted if the same as the value passed as the
callback's first argument. Reading is initially enabled, and returning `false`
does not imply there cannot be subsequent `recv` callbacks since messages may
already have been read. An empty tail is equivalent to the prevailing callback.
Defaults to a callback equivalent to `fun(ack, *) -> []; (\_, Msg) -> [Msg]
end`.

Remaining options are any accepted by `ssl:connect/3` or `gen_tcp:connect/3` for
a connecting transport, or `ssl:listen/2` or `gen_tcp:listen/2` for a listening
transport, depending on whether or not `{ssl_options, true}` has been specified.
Options `binary`, `packet` and `active` cannot be specified. Also, option `port`
can be specified for a listening transport to specify the local listening port,
the default being the standardized 3868. Note that the option `ip` specifies the
local address.

An `ssl_options` list must be specified if and only if the transport in question
has set `Inband-Security-Id` to 1 (`TLS`), as specified to either
`diameter:start_service/2` or `diameter:add_transport/2`, so that the transport
process will receive notification of whether or not to commence with a TLS
handshake following capabilities exchange. Failing to specify an options list on
a TLS-capable transport for which TLS is negotiated will cause TLS handshake to
fail. Failing to specify TLS capability when `ssl_options` has been specified
will cause the transport process to wait for a notification that will not be
forthcoming, which will eventually cause the RFC 3539 watchdog to take down the
connection.

The first element of a non-empty `Host-IP-Address` list in `Svc` provides the
local IP address if an `ip` option is not specified. The local address is either
returned from`start/3` or passed in a `connected` message over the transport
interface.
""".
-doc(#{since => <<"OTP R14B03">>}).
-spec start({accept, Ref}, #diameter_service{}, [listen_option()])
   -> {ok, pid(), [inet:ip_address()]}
 when Ref :: diameter:transport_ref();
           ({connect, Ref}, #diameter_service{}, [connect_option()])
   -> {ok, pid()}
 when Ref :: diameter:transport_ref().

start({T, Ref}, Svc, Opts) ->
    #diameter_service{capabilities = Caps,
                      pid = SvcPid}
        = Svc,

    diameter_tcp_sup:start(),  %% start tcp supervisors on demand
    {Mod, Rest} = split(Opts),
    Addrs = Caps#diameter_caps.host_ip_address,
    Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid},
    diameter_tcp_sup:start_child(Arg).

split([{module, M} | Opts]) ->
    {M, Opts};
split(Opts) ->
    {gen_tcp, Opts}.

%% start_link/1

-doc false.
start_link(T) ->
    proc_lib:start_link(?MODULE,
                        init,
                        [T],
                        infinity,
                        diameter_lib:spawn_opts(server, [])).

%% ---------------------------------------------------------------------------
%% # info/1
%% ---------------------------------------------------------------------------

-doc false.
info({Mod, Sock}) ->
    lists:flatmap(fun(K) -> info(Mod, K, Sock) end,
                  [{socket, fun sockname/2},
                   {peer, fun peername/2},
                   {statistics, fun getstat/2}
                   | ssl_info(Mod, Sock)]).

info(Mod, {K,F}, Sock) ->
    case F(Mod, Sock) of
        {ok, V} ->
            [{K,V}];
        _ ->
            []
    end.

ssl_info(ssl = M, Sock) ->
    [{M, ssl_info(Sock)}];
ssl_info(_, _) ->
    [].

ssl_info(Sock) ->
    [{peercert, C} || {ok, C} <- [ssl:peercert(Sock)]].

%% ---------------------------------------------------------------------------
%% # init/1
%% ---------------------------------------------------------------------------

-doc false.
init(T) ->
    gen_server:enter_loop(?MODULE, [], i(T)).

%% i/1

%% A transport process.
i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
  when T == accept;
       T == connect ->
    monitor(process, Pid),
    %% Since accept/connect might block indefinitely, spawn a process
    %% that kills us with the parent until call returns, and then
    %% sends outgoing messages.
    {[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
                                             sender,
                                             message_cb,
                                             fragment_timer]),
    SslOpts = ssl_opts(SO),
    OwnOpts = lists:append(TO),
    Tmo = proplists:get_value(fragment_timer,
                              OwnOpts,
                              ?DEFAULT_FRAGMENT_TIMEOUT),
    [CB, Sender] = [proplists:get_value(K, OwnOpts, false)
                    || K <- [message_cb, sender]],
    ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
    {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
    Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
    M = if SslOpts -> ssl; true -> Mod end,
    Sender andalso monitor(process, MPid),
    false == CB orelse (Pid ! {diameter, ack}),
    MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB},
    putr(?REF_KEY, Ref),
    setopts(#transport{parent = Pid,
                       module = M,
                       socket = Sock,
                       ssl = SslOpts,
                       message_cb = CB,
                       timeout = Tmo,
                       send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.

%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
    putr(?TRANSPORT_KEY, TPid),
    proc_lib:init_ack({ok, self()}),
    monitor(process, TPid),
    S#monitor{parent = monitor(process, Pid)};
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
%% killed when the killer process dies as a consequence of parent
%% death. However, a link can be unlinked and this is exactly what
%% gen_tcp seems to do. Links should be left to supervisors.

i({listen, Ref, {Mod, Opts, Addrs}}) ->
    [_] = diameter_config:subscribe(Ref, transport), %% assert existence
    {[LP], Rest} = proplists:split(Opts, [port]),
    {ok, LSock} = Mod:listen(get_port(LP), gen_opts(Addrs, Rest)),
    {ok, {LAddr, _}} = sockname(Mod, LSock),
    true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}),
    proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
    #listener{socket = LSock,
              module = Mod}.

ssl_opts([]) ->
    false;
ssl_opts([{ssl_options, true}]) ->
    true;
ssl_opts([{ssl_options, Opts}])
  when is_list(Opts) ->
    Opts;
ssl_opts(T) ->
    ?ERROR({ssl_options, T}).

%% init/8

%% Establish a TLS connection before capabilities exchange ...
init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) ->
    init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid);

%% ... or not.
init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) ->
    init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid).

%% init/7

init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) ->
    {[Matches], Rest} = proplists:split(Opts, [accept]),
    {ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}),
    ok = gen_server:call(LPid, {accept, SvcPid}, infinity),
    proc_lib:init_ack({ok, self(), [LAddr]}),
    Sock = ok(accept(Mod, LSock)),
    ok = accept_peer(Mod, Sock, accept(Matches)),
    publish(Mod, T, Ref, Sock),
    diameter_peer:up(Pid),
    Sock;

init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) ->
    {[RA, RP], Rest} = proplists:split(Opts, [raddr, rport]),
    RAddr = get_addr(RA),
    RPort = get_port(RP),
    proc_lib:init_ack({ok, self()}),
    Sock = ok(connect(Mod, RAddr, RPort, gen_opts(Addrs, Rest))),
    publish(Mod, T, Ref, Sock),
    up(Pid, {RAddr, RPort}, Mod, Sock),
    Sock.

up(Pid, Remote, Mod, Sock) ->
    {Addr, _Port} = ok(sockname(Mod, Sock)),
    diameter_peer:up(Pid, Remote, [Addr]).

publish(Mod, T, Ref, Sock) ->
    true = diameter_reg:add_new({?MODULE, T, {Ref, Sock}}),
    putr(?INFO_KEY, {Mod, Sock}).  %% for info/1

ok({ok, T}) ->
    T;
ok(No) ->
    x(No).

x(Reason) ->
    exit({shutdown, Reason}).

%% accept_peer/3

accept_peer(_Mod, _Sock, []) ->
    ok;

accept_peer(Mod, Sock, Matches) ->
    {RAddr, _} = ok(peername(Mod, Sock)),
    diameter_peer:match([RAddr], Matches)
        orelse x({accept, RAddr, Matches}),
    ok.

%% accept/1

accept(Opts) ->
    [[M] || {accept, M} <- Opts].

%% listener/2

%% Accepting processes can be started concurrently: ensure only one
%% listener is started.
listener(Ref, T) ->
    diameter_sync:call({?MODULE, listener, Ref},
                       {?MODULE, listener, [{Ref, T, self()}]},
                       infinity,
                       infinity).

%% listener/1

-doc false.
listener({Ref, T, _TPid}) ->
    l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T).

%% l/3

%% Existing listening process ...
l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
    {ok, LPid, AS};

%% ... or not.
l([], Ref, T) ->
    diameter_tcp_sup:start_child({listen, Ref, T}).

%% addrs/2
%%
%% Take the first address from the service if several are specified
%% and not address is configured.

addrs(Addrs, Opts) ->
    case lists:mapfoldr(fun ipaddr/2, [], Opts) of
        {Os, [_]} ->
            Os;
        {_, []} ->
            Opts ++ [{ip, A} || [A|_] <- [Addrs]];
        {_, As} ->
            ?ERROR({invalid_addrs, As, Addrs})
    end.

ipaddr({K,A}, As)
  when K == ifaddr;
       K == ip ->
    {{ip, ipaddr(A)}, [A | As]};
ipaddr(T, B) ->
    {T, B}.

ipaddr(A)
  when A == loopback;
       A == any ->
    A;
ipaddr(A) ->
    diameter_lib:ipaddr(A).

%% get_addr/1

get_addr([{_, Addr}]) ->
    diameter_lib:ipaddr(Addr);
get_addr(Addrs) ->
    ?ERROR({invalid_addrs, Addrs}).

%% get_port/1

get_port([{_, Port}]) ->
    Port;
get_port([]) ->
    ?DEFAULT_PORT;
get_port(Ps) ->
    ?ERROR({invalid_ports, Ps}).

%% gen_opts/2

gen_opts(Addrs, Opts) ->
    gen_opts(addrs(Addrs, Opts)).

%% gen_opts/1

gen_opts(Opts) ->
    {L,_} = proplists:split(Opts, [binary, packet, active]),
    [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
    [binary, {packet, 0}, {active, false} | Opts].

%% ---------------------------------------------------------------------------
%% # ports/1
%% ---------------------------------------------------------------------------

-doc false.
ports() ->
    Ts = diameter_reg:match({?MODULE, '_', '_'}),
    [{type(T), resolve(T,S), Pid} || {{?MODULE, T, {_,S}}, Pid} <- Ts].

-doc false.
ports(Ref) ->
    Ts = diameter_reg:match({?MODULE, '_', {Ref, '_'}}),
    [{type(T), resolve(T,S), Pid} || {{?MODULE, T, {R,S}}, Pid} <- Ts,
                                     R == Ref].

type(listener) ->
    listen;
type(T) ->
    T.

sock(listener, {_LAddr, Sock}) ->
    Sock;
sock(_, Sock) ->
    Sock.

resolve(Type, S) ->
    Sock = sock(Type, S),
    try
        {ok, T} = portnr(Sock),
        T
    catch
        _:_ -> Sock
    end.

%% Assume either ssl or gen_tcp. In particular, this can't deal with a
%% transport module that called start/3 with an own module option.
portnr(Sock) ->
    try
        {ok, _} = portnr(ssl, Sock)
    catch
        _:_ -> portnr(gen_tcp, Sock)
    end.

%% ---------------------------------------------------------------------------
%% # handle_call/3
%% ---------------------------------------------------------------------------

-doc false.
handle_call({accept, SvcPid}, _From, #listener{service = P} = S) ->
    {reply, ok, if not is_pid(P), is_pid(SvcPid) ->
                        monitor(process, SvcPid),
                        S#listener{service = SvcPid};
                   true ->
                        S
                end};

%% Transport is telling us of parent death.
handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
    {stop, {shutdown, Reason}, ok, S};

handle_call(_, _, State) ->
    {reply, nok, State}.

%% ---------------------------------------------------------------------------
%% # handle_cast/2
%% ---------------------------------------------------------------------------

-doc false.
handle_cast(_, State) ->
    {noreply, State}.

%% ---------------------------------------------------------------------------
%% # handle_info/2
%% ---------------------------------------------------------------------------

-doc false.
handle_info(T, #transport{} = S) ->
    {noreply, #transport{} = t(T,S)};

handle_info(T, #listener{} = S) ->
    {noreply, #listener{} = l(T,S)};

handle_info(T, #monitor{} = S) ->
    {noreply, #monitor{} = m(T,S)}.

%% ---------------------------------------------------------------------------
%% # code_change/3
%% ---------------------------------------------------------------------------

-doc false.
code_change(_, State, _) ->
    {ok, State}.

%% ---------------------------------------------------------------------------
%% # terminate/2
%% ---------------------------------------------------------------------------

-doc false.
terminate(_, _) ->
    ok.


%% ---------------------------------------------------------------------------

putr(Key, Val) ->
    put({?MODULE, Key}, Val).

getr(Key) ->
    get({?MODULE, Key}).

%% m/2
%%
%% Transition monitor state.

%% Outgoing message.
m(Msg, S)
  when is_record(Msg, diameter_packet);
       is_binary(Msg) ->
    send(Msg, S),
    S;

%% Transport has established a connection. Stop monitoring on the
%% parent so as not to die before a send from the transport.
m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) ->
    case T of
        {Sock, Mod} ->
            demonitor(S#monitor.parent, [flush]),
            S#monitor{parent = false,
                      socket = Sock,
                      module = Mod,
                      ack = Ack};
        false ->  %% monitor not sending
            x(M)
    end;

%% Transport is telling us to die.
m({stop, TPid} = T, #monitor{transport = TPid}) ->
    x(T);

%% Transport is telling us to die.
m({stop, TPid} = T, #monitor{transport = TPid}) ->
    x(T);

%% Transport is telling us that TLS has been negotiated after
%% capabilities exchange.
m({tls, SSock}, S) ->
    S#monitor{socket = SSock,
              module = ssl};

%% Transport or parent has died.
m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef,
                                           transport = TPid})
  when M == MRef;
       P == TPid ->
    x(T).

%% l/2
%%
%% Transition listener state. Or not anymore since any message causes
%% the process to exit.

-spec l(tuple(), #listener{})
   -> no_return().

%% Service process has died.
l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
                                              socket = Sock,
                                              module = M}) ->
    M:close(Sock),
    x(T);

%% Transport has been removed.
l({transport, remove, _} = T, #listener{socket = Sock,
                                        module = M}) ->
    M:close(Sock),
    x(T).

%% t/2
%%
%% Transition transport state.

t(T,S) ->
    case transition(T,S) of
        ok ->
            S;
        #transport{} = NS ->
            NS;
        stop ->
            x(T)
    end.

%% transition/2

%% Incoming packets.
transition({P, Sock, Bin}, #transport{socket = Sock,
                                      ssl = B,
                                      frag = Frag}
                           = S)
  when P == ssl, true == B;
       P == tcp ->
    recv(acc(Frag, Bin), S);

%% Capabilities exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
                                            = S) ->
    true = is_boolean(B),  %% assert
    #transport{}
        = NS
        = tls_handshake(Type, B, S),
    Pid ! {diameter, {tls, Ref}},
    NS#transport{ssl = B};

transition({C, Sock}, #transport{socket = Sock,
                                 ssl = B})
  when C == tcp_closed, not B;
       C == ssl_closed, B ->
    stop;

transition({E, Sock, _Reason} = T, #transport{socket = Sock,
                                              ssl = B}
                                   = S)
  when E == tcp_error, not B;
       E == ssl_error, B ->
    ?ERROR({T,S});

%% Outgoing message.
transition({diameter, {send, Msg}}, #transport{} = S) ->
    message(send, Msg, S);

%% Monitor has sent an outgoing message.
transition(Msg, S)
  when is_record(Msg, diameter_packet);
       is_binary(Msg) ->
    message(ack, Msg, S);

%% Deferred actions from a message_cb.
transition({actions, Dir, Acts}, S) ->
    setopts(actions(Acts, Dir, S));

%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
                                                socket = Sock,
                                                module = M}) ->
    M:close(Sock),
    stop;

%% Timeout for reception of outstanding packets.
transition({timeout, TRef, flush}, #transport{tref = TRef} = S) ->
    flush(S#transport{tref = false});

%% Request for the local port number.
transition({resolve_port, Pid}, #transport{socket = Sock,
                                           module = M})
  when is_pid(Pid) ->
    Pid ! portnr(M, Sock),
    ok;

%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
                                                    send = MPid}) ->
    false == MPid
        orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
        orelse exit(MPid, {shutdown, parent}),
    stop;

%% Monitor process has died.
transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
  when is_pid(MPid) ->
    stop.

%% Crash on anything unexpected.

%% tls_handshake/3
%%
%% In the case that no tls message is received (eg. the service hasn't
%% been configured to advertise TLS support) we will simply never ask
%% for another TCP message, which will force the watchdog to
%% eventually take us down.

%% TLS has already been established with the connection.
tls_handshake(_, _, #transport{ssl = true} = S) ->
    S;

%% Capabilities exchange negotiated TLS but transport was not
%% configured with an options list.
tls_handshake(_, true, #transport{ssl = false}) ->
    ?ERROR(no_ssl_options);

%% Capabilities exchange negotiated TLS: upgrade the connection.
tls_handshake(Type, true, #transport{socket = Sock,
                                     module = M,
                                     ssl = Opts,
                                     send = MPid}
                          = S) ->
    {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
    Ref = getr(?REF_KEY),
    true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
    false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
    S#transport{socket = SSock,
                module = ssl};

%% Capabilities exchange has not negotiated TLS.
tls_handshake(_, false, S) ->
    S.

tls(connect, Sock, Opts) ->
    ssl:connect(Sock, Opts);
tls(accept, Sock, Opts) ->
    ssl:handshake(Sock, Opts).  %% assume no handshake option

%% recv/2
%%
%% Reassemble fragmented messages and extract multiple message sent
%% using Nagle.

%% Receive packets until a full message is received,

recv({Msg, Rest}, S) ->  %% have a complete message ...
    recv(acc(Rest), message(recv, Msg, S));

recv(Frag, #transport{recv = B,
                      socket = Sock,
                      module = M}
           = S) ->       %% or not
    B andalso setopts(M, Sock),
    start_fragment_timer(S#transport{frag = Frag,
                                     flush = false,
                                     active = B}).

%% acc/2

%% Know how many bytes to extract.
acc([Len | Acc], Bin) ->
    acc1(Len, <<Acc/binary, Bin/binary>>);

%% Or not.
acc(Head, Bin) ->
    acc(<<Head/binary, Bin/binary>>).

%% acc1/3

%% Extract a message for which we have all bytes.
acc1(Len, Bin)
  when Len =< byte_size(Bin) ->
    split_binary(Bin, Len);

%% Wait for more packets.
acc1(Len, Bin) ->
    [Len | Bin].

%% acc/1

%% Don't match on Bin since this results in it being copied at the
%% next append according to the Efficiency Guide. This is also the
%% reason that the Len is extracted and maintained when accumulating
%% messages. The simplest implementation is just to accumulate a
%% binary and match <<_, Len:24, _/binary>> each time the length is
%% required, but the performance of this decays quadratically with the
%% message length, since the binary is then copied with each append of
%% additional bytes from gen_tcp.

acc(Bin)
  when 3 < byte_size(Bin) ->
    {Head, _} = split_binary(Bin, 4),
    [_,A,B,C] = binary_to_list(Head),
    Len = (A bsl 16) bor (B bsl 8) bor C,
    if Len < 20 ->
            %% Message length isn't sufficient for a Diameter Header.
            %% Chances are things will go south from here but if we're
            %% lucky then the bytes we have extend to an intended
            %% message boundary and we can recover by simply receiving
            %% them. Make it so.
            {Bin, <<>>};
       true ->
            acc1(Len, Bin)
    end;

%% Not even 4 bytes yet.
acc(Bin) ->
    Bin.

%% bin/1

bin([_ | Bin]) ->
    Bin;

bin(Bin) ->
    Bin.

%% flush/1

%% An erroneously large message length may leave us with a fragment
%% that lingers if the peer doesn't have anything more to send. Start
%% a timer to force reception if an incoming message doesn't arrive
%% first. This won't stop a peer from sending a large bogus value and
%% following it up however but such a state of affairs can only go on
%% for so long since an unanswered DWR will eventually be the result.
%%
%% An erroneously small message length causes problems as well but
%% since all messages with length problems are discarded this should
%% also eventually lead to watchdog failover.

%% No fragment to flush or not receiving messages.
flush(#transport{frag = <<>>} = S) ->
    S;

%% Messages have been received since last timer expiry.
flush(#transport{flush = false} = S) ->
    start_fragment_timer(S#transport{flush = true});

%% No messages since last expiry.
flush(#transport{frag = Frag} = S) ->
    message(recv, bin(Frag), S#transport{frag = <<>>}).

%% start_fragment_timer/1
%%
%% Start a timer only if there's none running and a message to flush.

start_fragment_timer(#transport{frag = B, tref = TRef} = S)
  when B == <<>>;
       TRef /= false ->
    S;

start_fragment_timer(#transport{timeout = Tmo} = S) ->
    S#transport{tref = erlang:start_timer(Tmo, self(), flush)}.

%% accept/2

accept(ssl, LSock) ->
    case ssl:transport_accept(LSock) of
        {ok, Sock} ->
            ssl:handshake(Sock);
        {error, _} = No ->
            No
    end;
accept(Mod, LSock) ->
    Mod:accept(LSock).

%% connect/4

connect(Mod, Host, Port, Opts) ->
    Mod:connect(Host, Port, Opts).

%% send/2

send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) ->
    send1(M, Sock, Msg),
    B andalso (TPid ! Msg);

send(Msg, #transport{socket = Sock, module = M, send = false} = S) ->
    send1(M, Sock, Msg),
    message(ack, Msg, S);

%% Send from the monitor process to avoid deadlock if both the
%% receiver and the peer were to block in send.
send(Msg, #transport{send = Pid} = S) ->
    Pid ! Msg,
    S.

%% send1/3

send1(Mod, Sock, #diameter_packet{bin = Bin}) ->
    send1(Mod, Sock, Bin);

send1(Mod, Sock, Bin) ->
    case send(Mod, Sock, Bin) of
        ok ->
            ok;
        {error, Reason} ->
            x({send, Reason})
    end.

%% send/3

send(gen_tcp, Sock, Bin) ->
    gen_tcp:send(Sock, Bin);
send(ssl, Sock, Bin) ->
    ssl:send(Sock, Bin);
send(M, Sock, Bin) ->
    M:send(Sock, Bin).

%% setopts/3

setopts(gen_tcp, Sock, Opts) ->
    inet:setopts(Sock, Opts);
setopts(ssl, Sock, Opts) ->
    ssl:setopts(Sock, Opts);
setopts(M, Sock, Opts) ->
    M:setopts(Sock, Opts).

%% setopts/1

setopts(#transport{socket = Sock,
                   active = A,
                   recv = B,
                   module = M}
        = S)
  when B, not A ->
    setopts(M, Sock),
    S#transport{active = true};

setopts(S) ->
    S.

%% setopts/2

setopts(M, Sock) ->
    case setopts(M, Sock, [{active, once}]) of
        ok -> ok;
        X  -> x({setopts, Sock, M, X})  %% possibly on peer disconnect
    end.

%% portnr/2

portnr(gen_tcp, Sock) ->
    inet:port(Sock);
portnr(M, Sock) ->
    case M:sockname(Sock) of
        {ok, {_Addr, PortNr}} ->
            {ok, PortNr};
        {error, _} = No ->
            No
    end.

%% sockname/2

sockname(gen_tcp, Sock) ->
    inet:sockname(Sock);
sockname(M, Sock) ->
    M:sockname(Sock).

%% peername/2

peername(gen_tcp, Sock) ->
    inet:peername(Sock);
peername(M, Sock) ->
    M:peername(Sock).

%% getstat/2

getstat(gen_tcp, Sock) ->
    inet:getstat(Sock);
getstat(M, Sock) ->
    M:getstat(Sock).
%% Note that ssl:getstat/1 doesn't yet exist in R15B01.

%% A message_cb is invoked whenever a message is sent or received, or
%% to provide acknowledgement of a completed send or discarded
%% request. Ignoring possible extra arguments, calls are of the
%% following form.
%%
%% cb(recv, Msg)          Receive a message into diameter?
%% cb(send, Msg)          Send a message on the socket?
%% cb(ack,  Msg)          Acknowledgement of a completed send.
%% cb(ack,  false)        Acknowledgement of a discarded request.
%%
%% Msg will be binary() in a recv callback, but can be a
%% diameter_packet record in a send/ack callback if a recv/send
%% callback returns a record. Callbacks return a list of the following
%% form.
%%
%%   [boolean() | send | recv | binary() | #diameter_packet{}]
%%
%% The atoms are meaningless by themselves, but say whether subsequent
%% messages are to be sent or received. A boolean says whether or not
%% to continue reading on the socket. Messages can be received even
%% after false is returned if these arrived in the same packet. A
%% leading recv or send is implicit on the corresponding callbacks. A
%% new callback can be returned as the tail of a returned list: any
%% value not of the aforementioned list type is interpreted as a
%% callback.

%% message/3

message(send, false = M, S) ->
    message(ack, M, S);

message(ack, _, #transport{message_cb = false} = S) ->
    S;

message(Dir, Msg, #transport{message_cb = CB} = S) ->
    setopts(actions(cb(CB, Dir, Msg), Dir, S)).

%% actions/3

actions([], _, S) ->
    S;

actions([B | As], Dir, S)
  when is_boolean(B) ->
    actions(As, Dir, S#transport{recv = B});

actions([Dir | As], _, S)
  when Dir == send;
       Dir == recv ->
    actions(As, Dir, S);

actions([Msg | As], send = Dir, S)
  when is_binary(Msg);
       is_record(Msg, diameter_packet) ->
    send(Msg, actions(As, Dir, S));

actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
  when is_binary(Msg);
       is_record(Msg, diameter_packet) ->
    diameter_peer:recv(Pid, Msg),
    actions(As, Dir, S);

actions([{defer, Tmo, Acts} | As], Dir, S) ->
    erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
    actions(As, Dir, S);

actions(CB, _, S) ->
    S#transport{message_cb = CB}.

%% cb/3

cb(false, _, Msg) ->
    [Msg];

cb(CB, Dir, Msg) ->
    diameter_lib:eval([CB, Dir, Msg]).
